/*
 * Copyright 2021-2022 Open Kunlun Technology <https://www.openkunlun.io>
 */

package io.openkunlun.scaladsl.server

import akka.grpc.scaladsl.Metadata
import akka.util.Timeout
import com.google.protobuf.ByteString
import io.grpc.internal.GrpcUtil
import io.openkunlun.scaladsl.serialization.DaprObjectSerialization
import io.openkunlun.scaladsl.util.Strings
import io.openkunlun.scaladsl.v1.BindingEventResponse.BindingEventConcurrency
import io.openkunlun.scaladsl.v1.{ BindingEventRequest, BindingEventResponse }

import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Try

/**
 * @author ericxin.
 */
trait BindingHandler extends DaprHandler {
  val name: String
  def handle(req: BindingEventRequest, metadata: Metadata)(implicit ec: ExecutionContext): Future[BindingEventResponse]
}
abstract class BindingService[Req, Resp](implicit m: Manifest[Req]) extends BindingHandler {

  val deadline: Timeout
  val serialization: DaprObjectSerialization

  /**
   *
   * @param req
   * @param metadata
   * @param ec
   * @return
   */
  override final def handle(req: BindingEventRequest, metadata: Metadata)(implicit ec: ExecutionContext): Future[BindingEventResponse] = {
    val dataObj = serialization.deserialize[Req](req.data.toByteArray)
    val request = BindingRequest(req.name, dataObj, req.metadata)

    implicit val timeout: Timeout = Try(metadata.getText(GrpcUtil.TIMEOUT).map(_.toLong.nanos).map(Timeout.apply).getOrElse(deadline)).getOrElse(deadline)
    for {
      resp <- handle(request, metadata)
      result = serialization.serialize[Resp](resp.data)
    } yield BindingEventResponse(
      resp.storeName.getOrElse(""),
      Seq.empty,
      resp.to.filter(Strings.isEmpty),
      ByteString.copyFrom(result),
      if (resp.concurrency.equalsIgnoreCase("parallel")) BindingEventConcurrency.PARALLEL else BindingEventConcurrency.SEQUENTIAL
    )
  }

  /**
   *
   * @param req
   * @param metadata
   * @param ec
   * @return
   */
  def handle(req: BindingRequest[Req], metadata: Metadata)(implicit ec: ExecutionContext, timeout: Timeout): Future[BindingResponse[Resp]]
}
final case class BindingRequest[T](name: String, data: T, metadata: Map[String, String])
final case class BindingResponse[T](data: T, storeName: Option[String] = None, to: Seq[String] = Seq.empty, concurrency: String = "SEQUENTIAL")
